---
title: "Temporal Integration"
description: "Build durable, scalable agent workflows with Temporal orchestration"
---

<Info>
  Temporal provides durable execution for your agent workflows, enabling automatic retries, pause/resume capabilities, and time-travel debugging. Perfect for production deployments.
</Info>

## Why Temporal?

mcp-agent supports both `asyncio` and `temporal` execution engines. While asyncio works great for development and simple workflows, Temporal is recommended for production deployments because it provides:

<CardGroup cols={2}>
  <Card title="Durable Execution" icon="shield">
    Workflows survive failures, restarts, and infrastructure issues
  </Card>
  <Card title="Automatic Retries" icon="rotate">
    Failed activities are automatically retried with configurable policies
  </Card>
  <Card title="Pause & Resume" icon="pause">
    Workflows can be paused indefinitely and resumed with new data
  </Card>
  <Card title="Observability" icon="magnifying-glass">
    Complete workflow history and time-travel debugging via Temporal UI
  </Card>
  <Card title="Scalability" icon="chart-line">
    Distribute workflow execution across multiple workers
  </Card>
  <Card title="Long-Running Workflows" icon="clock">
    Support for workflows that run for days, weeks, or months
  </Card>
</CardGroup>

## Quick Start

<Steps>
  <Step title="Install Temporal CLI">
    Install the Temporal CLI for local development:
    ```bash
    # macOS
    brew install temporal
    
    # Linux/WSL
    curl -sSf https://temporal.download/cli.sh | sh
    
    # Windows
    # Download from https://github.com/temporalio/cli/releases
    ```
  </Step>
  
  <Step title="Start Temporal Server">
    Run a local Temporal server for development:
    ```bash
    temporal server start-dev
    ```
    
    This starts:
    - Temporal Server on `localhost:7233`
    - Web UI on `http://localhost:8233`
  </Step>
  
  <Step title="Configure mcp-agent">
    Update your `mcp_agent.config.yaml`:
    ```yaml
    execution_engine: temporal

    # Optional: preload modules that register @workflow_task activities
    workflow_task_modules:
      - my_package.custom_temporal_tasks

    # Optional: override retry behaviour for specific workflow tasks/activities
    workflow_task_retry_policies:
      my_package.custom_temporal_tasks.my_activity:
        maximum_attempts: 1

    temporal:
      host: localhost
      port: 7233
      namespace: default
      task_queue: mcp-agent
      max_concurrent_activities: 10
    ```
    `mcp-agent` preloads its built-in LLM providers automatically. Add extra modules
    when you register custom `@workflow_task` activities outside the core packages so
    the worker can discover them before starting. Entries are standard Python import paths.
    The optional `workflow_task_retry_policies` mapping lets you tune Temporal retry
    behaviour per activity (supports exact names, wildcards like `prefix*`, or `*`).
    For provider SDKs, common non-retryable error types include:
    - OpenAI/Azure OpenAI: `AuthenticationError`, `PermissionDeniedError`, `BadRequestError`, `NotFoundError`, `UnprocessableEntityError`.
    - Anthropic: `AuthenticationError`, `PermissionDeniedError`, `BadRequestError`, `NotFoundError`, `UnprocessableEntityError`.
    - Azure AI Inference: `HttpResponseError` (400/401/403/404/422).
    - Google GenAI: `InvalidArgument`, `FailedPrecondition`, `PermissionDenied`, `NotFound`, `Unauthenticated`.
    mcp-agent raises a `WorkflowApplicationError` for these cases so Temporal (or the asyncio executor) avoids retry loops even when the Temporal SDK is not installed locally.
  </Step>
  
  <Step title="Create Worker">
    Create a worker to process workflows:
    ```python worker.py
    import asyncio
    from mcp_agent.app import MCPApp
    from mcp_agent.executor.workflow import Workflow, WorkflowResult
    from mcp_agent.executor.temporal import create_temporal_worker_for_app
    
    app = MCPApp(name="my_agent")
    
    # Define your workflows here
    @app.workflow
    class MyWorkflow(Workflow[str]):
        @app.workflow_run
        async def run(self, input: str) -> WorkflowResult[str]:
            return WorkflowResult(value=f"Processed: {input}")
    
    async def main():
        async with create_temporal_worker_for_app(app) as worker:
            await worker.run()
    
    if __name__ == "__main__":
        asyncio.run(main())
    ```
  </Step>
  
  <Step title="Run Workflow">
    Execute your workflow:
    ```python main.py
    import asyncio
    from mcp_agent.app import MCPApp
    
    app = MCPApp(name="my_agent")
    
    async def main():
        async with app.run() as agent_app:
            executor = agent_app.executor
            
            # Start workflow
            handle = await executor.start_workflow(
                "MyWorkflow",
                "Hello Temporal!"
            )
            
            # Wait for result
            result = await handle.result()
            print(f"Result: {result}")
    
    if __name__ == "__main__":
        asyncio.run(main())
    ```
  </Step>
</Steps>

## Temporal Architecture

### Core Components

Temporal's architecture provides robust workflow orchestration through several key components:

<CardGroup cols={2}>
  <Card title="Temporal Server" icon="server">
    Manages workflow state, persists event history, and coordinates execution
  </Card>
  <Card title="Workers" icon="cogs">
    Execute workflow and activity code, poll for tasks from the server
  </Card>
  <Card title="Event Store" icon="database">
    Immutable log of all workflow events, enabling replay and fault tolerance
  </Card>
  <Card title="Task Queues" icon="list">
    Distribute work between server and workers, enabling load balancing
  </Card>
</CardGroup>

### Benefits of Temporal Architecture

**Durability & Fault Tolerance:**
Temporal's event sourcing model ensures that every workflow step is persisted. If a worker crashes, another worker can pick up where it left off by replaying the event history.

```python
# This workflow will survive any infrastructure failure
@app.workflow
class ResilientWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, data: dict) -> WorkflowResult[dict]:
        # Step 1: Process data (checkpointed)
        result1 = await self.process_step_1(data)
        
        # Step 2: Validate results (checkpointed)
        result2 = await self.validate_step_2(result1)
        
        # Step 3: Finalize (checkpointed)
        # If worker crashes here, it will resume from this point
        result3 = await self.finalize_step_3(result2)
        
        return WorkflowResult(value=result3)
```

**Automatic Retries & Exponential Backoff:**
Temporal handles activity failures with configurable retry policies:

```python
from temporalio.common import RetryPolicy
from datetime import timedelta

@app.workflow
class RetryWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, input: str) -> WorkflowResult[str]:
        # Configure retry policy for this activity
        retry_policy = RetryPolicy(
            initial_interval=timedelta(seconds=1),
            maximum_interval=timedelta(minutes=5),
            backoff_coefficient=2.0,
            maximum_attempts=10,
            non_retryable_error_types=["ValidationError"]
        )
        
        # This will automatically retry on failure
        result = await workflow.execute_activity(
            self.unreliable_activity,
            input,
            start_to_close_timeout=timedelta(minutes=5),
            retry_policy=retry_policy
        )
        
        return WorkflowResult(value=result)
    
    async def unreliable_activity(self, data: str) -> str:
        """Activity that might fail and needs retries."""
        # Simulate unreliable external API call
        agent = Agent(name="api_caller", server_names=["http"])
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            return await llm.generate_str(f"Process via API: {data}")
```

### Activity vs Workflow Distinction

**Workflows** are orchestration logic that must be deterministic:
- No direct I/O operations
- No random number generation without seeds
- No current time checks (use `workflow.now()`)
- Pure coordination and decision making

**Activities** handle non-deterministic operations:
- External API calls
- Database operations
- File I/O
- Any side effects

```python
@app.workflow
class ProperWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, input: dict) -> WorkflowResult[dict]:
        # ✅ Workflow: Pure orchestration logic
        if input.get("requires_validation"):
            # ✅ Call activity for external operations
            validated = await workflow.execute_activity(
                self.validate_data_activity,
                input,
                start_to_close_timeout=timedelta(minutes=2)
            )
            
            # ✅ Workflow: Decision making based on results
            if validated.get("is_valid"):
                return await self.process_valid_data(validated)
            else:
                return await self.handle_invalid_data(validated)
        
        return WorkflowResult(value=input)
    
    async def validate_data_activity(self, data: dict) -> dict:
        """❌ Activity: Non-deterministic operations allowed here."""
        agent = Agent(name="validator", server_names=["database", "api"])
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            
            # ✅ External I/O operations in activities
            validation_result = await llm.generate_str(
                f"Validate this data against external service: {data}"
            )
            
            return {"is_valid": "valid" in validation_result.lower(), "result": validation_result}
```

## Advanced Workflow Features

### Signal and Query Handlers

Signals allow external systems to communicate with running workflows:

```python
from temporalio import workflow
from typing import Optional

@app.workflow
class ApprovalWorkflow(Workflow[dict]):
    def __init__(self):
        self.approval_status: Optional[str] = None
        self.approval_comments: Optional[str] = None
    
    @workflow.signal
    async def approve_signal(self, comments: str):
        """Signal handler for approval."""
        self.approval_status = "approved"
        self.approval_comments = comments
    
    @workflow.signal
    async def reject_signal(self, reason: str):
        """Signal handler for rejection."""
        self.approval_status = "rejected"
        self.approval_comments = reason
    
    @workflow.query
    def get_status(self) -> dict:
        """Query handler to check current status."""
        return {
            "status": self.approval_status,
            "comments": self.approval_comments
        }
    
    @app.workflow_run
    async def run(self, document: dict) -> WorkflowResult[dict]:
        # Process initial document
        agent = Agent(name="processor", server_names=["filesystem"])
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            processed = await llm.generate_str(f"Process document: {document}")
        
        # Wait for approval signal (can wait indefinitely)
        await workflow.wait_condition(lambda: self.approval_status is not None)
        
        if self.approval_status == "approved":
            # Continue with approved workflow
            async with agent:
                finalized = await llm.generate_str(
                    f"Finalize approved document: {processed}. Comments: {self.approval_comments}"
                )
            
            return WorkflowResult(value={
                "status": "completed",
                "document": finalized,
                "approval_comments": self.approval_comments
            })
        else:
            # Handle rejection
            return WorkflowResult(
                value=None,
                error=f"Document rejected: {self.approval_comments}"
            )

# Send signals from external code
async def send_approval():
    async with app.run() as agent_app:
        executor = agent_app.executor
        
        # Send approval signal to running workflow
        await executor.signal_workflow(
            "ApprovalWorkflow",
            "workflow-123",
            "approve_signal",
            "Document looks good after review!"
        )
        
        # Query workflow status
        status = await executor.query_workflow(
            "ApprovalWorkflow",
            "workflow-123",
            "get_status"
        )
        print(f"Workflow status: {status}")
```

### Workflow Versioning

Handle workflow updates without breaking running instances:

```python
@app.workflow
class VersionedWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, data: dict) -> WorkflowResult[dict]:
        # Use versioning for backward compatibility
        version = workflow.get_version("data_processing_logic", 1, 3)
        
        if version == 1:
            # Original processing logic
            result = await self.process_v1(data)
        elif version == 2:
            # Enhanced processing with validation
            validated = await self.validate_data(data)
            result = await self.process_v2(validated)
        else:  # version == 3
            # Latest version with advanced features
            validated = await self.validate_data_v2(data)
            enriched = await self.enrich_data(validated)
            result = await self.process_v3(enriched)
        
        # Common post-processing (no versioning needed)
        final_result = await self.post_process(result)
        
        return WorkflowResult(value=final_result)
    
    async def process_v1(self, data: dict) -> dict:
        """Original processing logic."""
        agent = Agent(name="processor_v1", server_names=["filesystem"])
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            return await llm.generate_str(f"Process v1: {data}")
    
    async def process_v2(self, data: dict) -> dict:
        """Enhanced processing with validation."""
        agent = Agent(name="processor_v2", server_names=["filesystem", "validation"])
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            return await llm.generate_str(f"Process v2 with validation: {data}")
    
    async def process_v3(self, data: dict) -> dict:
        """Latest version with advanced features."""
        agent = Agent(name="processor_v3", server_names=["filesystem", "validation", "ml"])
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            return await llm.generate_str(f"Process v3 with ML enhancement: {data}")
```

### Workflow Timeouts and Cancellation

Configure comprehensive timeout policies:

```python
from datetime import timedelta

@app.workflow
class TimeoutWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, data: dict) -> WorkflowResult[dict]:
        try:
            # Set workflow-level timeout
            async with workflow.timeout(timedelta(hours=2)):
                # Step 1: Quick processing (30 seconds max)
                result1 = await workflow.execute_activity(
                    self.quick_process,
                    data,
                    start_to_close_timeout=timedelta(seconds=30)
                )
                
                # Step 2: Medium processing (5 minutes max)
                result2 = await workflow.execute_activity(
                    self.medium_process,
                    result1,
                    start_to_close_timeout=timedelta(minutes=5),
                    heartbeat_timeout=timedelta(seconds=30)  # For long-running activities
                )
                
                # Step 3: Long processing (1 hour max)
                result3 = await workflow.execute_activity(
                    self.long_process,
                    result2,
                    start_to_close_timeout=timedelta(hours=1),
                    schedule_to_close_timeout=timedelta(hours=1, minutes=30)
                )
                
                return WorkflowResult(value=result3)
                
        except workflow.TimeoutError:
            # Handle timeout gracefully
            return WorkflowResult(
                value=None,
                error="Workflow timed out after 2 hours"
            )
        except workflow.CancelledError:
            # Handle cancellation
            return WorkflowResult(
                value=None,
                error="Workflow was cancelled"
            )
    
    async def long_process(self, data: dict) -> dict:
        """Long-running activity with heartbeat."""
        agent = Agent(name="long_processor", server_names=["ml", "database"])
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            
            # Send heartbeats for long operations
            for i in range(60):  # Simulate 1-hour process
                # Send heartbeat every minute
                workflow.heartbeat(f"Processing step {i+1}/60")
                
                # Do some processing
                partial_result = await llm.generate_str(
                    f"Process chunk {i}: {data}"
                )
                
                # Sleep for 1 minute (simulated)
                await asyncio.sleep(60)
            
            return {"processed": True, "data": data}
```

## Core Concepts

### Workflow Definition

Temporal workflows are defined the same way as asyncio workflows:

```python
from mcp_agent.app import MCPApp
from mcp_agent.executor.workflow import Workflow, WorkflowResult
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM

app = MCPApp(name="temporal_agent")

@app.workflow
class DurableWorkflow(Workflow[dict]):
    """A durable workflow that can survive failures."""
    
    @app.workflow_run
    async def run(self, request: dict) -> WorkflowResult[dict]:
        # This workflow is durable - it will resume from
        # where it left off if the worker crashes
        
        agent = Agent(
            name="analyst",
            instruction="Analyze the provided data thoroughly.",
            server_names=["fetch", "filesystem"]
        )
        
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            
            # Each step is automatically checkpointed
            step1 = await llm.generate_str(f"Analyze: {request['data']}")
            step2 = await llm.generate_str(f"Summarize findings: {step1}")
            step3 = await llm.generate_str(f"Generate report: {step2}")
            
            return WorkflowResult(value={
                "analysis": step1,
                "summary": step2,
                "report": step3
            })
```

### Signals for Human-in-the-Loop

Implement workflows that wait for human input:

```python
from mcp_agent.executor.temporal import Signal

@app.workflow
class ApprovalWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, document: str) -> WorkflowResult[str]:
        # Process document with AI
        agent = Agent(
            name="processor",
            instruction="Process and improve the document.",
            server_names=["filesystem"]
        )
        
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            processed = await llm.generate_str(f"Improve this document: {document}")
        
        # Wait for human approval
        print(f"Waiting for approval. Workflow ID: {self.id}, Run ID: {self.run_id}")
        
        await app.context.executor.signal_bus.wait_for_signal(
            Signal(name="approve", workflow_id=self.id, run_id=self.run_id)
        )
        
        # Continue after approval
        print("Approval received! Finalizing document...")
        
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            finalized = await llm.generate_str(f"Finalize approved document: {processed}")
        
        return WorkflowResult(value=finalized)
```

Send signals from external code:

```python
# Send approval signal
await app.context.executor.signal_bus.send_signal(
    Signal(
        name="approve",
        workflow_id="ApprovalWorkflow",
        run_id="run_abc123",
        payload={"approved_by": "john.doe", "comments": "Looks good!"}
    )
)
```

### Long-Running Workflows

Handle workflows that run for extended periods:

```python
import asyncio
from datetime import timedelta

@app.workflow
class MonitoringWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, config: dict) -> WorkflowResult[dict]:
        monitoring_results = []
        
        # Run for 30 days, checking every hour
        for day in range(30):
            for hour in range(24):
                # Durable sleep - survives restarts
                await asyncio.sleep(3600)  # 1 hour
                
                # Check system status
                agent = Agent(
                    name="monitor",
                    instruction="Check system health and report issues.",
                    server_names=["fetch"]
                )
                
                async with agent:
                    llm = await agent.attach_llm(OpenAIAugmentedLLM)
                    status = await llm.generate_str(f"Check status of: {config['systems']}")
                    
                    monitoring_results.append({
                        "day": day,
                        "hour": hour,
                        "status": status
                    })
                    
                    # Alert if issues found
                    if "critical" in status.lower():
                        await self.send_alert(status)
        
        return WorkflowResult(value={"monitoring_complete": monitoring_results})
```

## Advanced Patterns

### Parallel Agent Execution

Run multiple agents in parallel with Temporal:

```python
import asyncio

@app.workflow
class ParallelAnalysisWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, document: str) -> WorkflowResult[dict]:
        # Define parallel tasks
        async def analyze_sentiment():
            agent = Agent(name="sentiment", instruction="Analyze sentiment.")
            async with agent:
                llm = await agent.attach_llm(OpenAIAugmentedLLM)
                return await llm.generate_str(f"Analyze sentiment: {document}")
        
        async def extract_entities():
            agent = Agent(name="entities", instruction="Extract entities.")
            async with agent:
                llm = await agent.attach_llm(OpenAIAugmentedLLM)
                return await llm.generate_str(f"Extract entities: {document}")
        
        async def summarize():
            agent = Agent(name="summarizer", instruction="Summarize content.")
            async with agent:
                llm = await agent.attach_llm(OpenAIAugmentedLLM)
                return await llm.generate_str(f"Summarize: {document}")
        
        # Execute in parallel - Temporal handles orchestration
        sentiment, entities, summary = await asyncio.gather(
            analyze_sentiment(),
            extract_entities(),
            summarize()
        )
        
        return WorkflowResult(value={
            "sentiment": sentiment,
            "entities": entities,
            "summary": summary
        })
```

### Workflow Composition

Compose complex workflows from simpler ones:

```python
@app.workflow
class DataPipelineWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, source: str) -> WorkflowResult[dict]:
        # Step 1: Data extraction workflow
        extraction = DataExtractionWorkflow()
        data = await extraction.run(source)
        
        # Step 2: Data validation workflow
        validation = DataValidationWorkflow()
        validated = await validation.run(data.value)
        
        # Step 3: Data processing workflow
        processing = DataProcessingWorkflow()
        processed = await processing.run(validated.value)
        
        # Step 4: Report generation workflow
        reporting = ReportGenerationWorkflow()
        report = await reporting.run(processed.value)
        
        return WorkflowResult(value={
            "data": data.value,
            "validation": validated.value,
            "processed": processed.value,
            "report": report.value
        })
```

### Error Handling with Compensations

Implement saga pattern for distributed transactions:

```python
@app.workflow
class OrderProcessingWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, order: dict) -> WorkflowResult[dict]:
        compensations = []
        
        try:
            # Step 1: Reserve inventory
            inventory_agent = Agent(name="inventory", server_names=["database"])
            async with inventory_agent:
                llm = await inventory_agent.attach_llm(OpenAIAugmentedLLM)
                reservation = await llm.generate_str(f"Reserve items: {order['items']}")
                compensations.append(("inventory", reservation))
            
            # Step 2: Process payment
            payment_agent = Agent(name="payment", server_names=["payment_api"])
            async with payment_agent:
                llm = await payment_agent.attach_llm(OpenAIAugmentedLLM)
                payment = await llm.generate_str(f"Process payment: {order['total']}")
                compensations.append(("payment", payment))
            
            # Step 3: Ship order
            shipping_agent = Agent(name="shipping", server_names=["shipping_api"])
            async with shipping_agent:
                llm = await shipping_agent.attach_llm(OpenAIAugmentedLLM)
                shipment = await llm.generate_str(f"Ship to: {order['address']}")
            
            return WorkflowResult(value={
                "success": True,
                "reservation": reservation,
                "payment": payment,
                "shipment": shipment
            })
            
        except Exception as e:
            # Run compensations in reverse order
            for service, data in reversed(compensations):
                await self.compensate(service, data)
            
            return WorkflowResult(
                value=None,
                error=f"Order failed: {e}. Compensations executed."
            )
    
    async def compensate(self, service: str, data: str):
        """Execute compensation for failed step."""
        agent = Agent(name=f"{service}_compensation")
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            await llm.generate_str(f"Compensate {service}: {data}")
```

## Production Deployment

### Infrastructure Requirements

**Minimum Production Setup:**
- Temporal Server cluster (3+ nodes for HA)
- PostgreSQL/MySQL database with replication
- Elasticsearch for visibility (optional but recommended)
- Load balancer for Temporal frontend
- Monitoring stack (Prometheus, Grafana)

**Resource Planning:**
```yaml
# Example Kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: temporal-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: temporal-server
  template:
    spec:
      containers:
      - name: temporal
        image: temporalio/auto-setup:latest
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        env:
        - name: DB
          value: postgresql
        - name: POSTGRES_SEEDS
          value: postgres-primary:5432
        - name: DYNAMIC_CONFIG_FILE_PATH
          value: /etc/temporal/config/dynamicconfig.yaml
```

### High Availability Configuration

Configure Temporal for production resilience:

```yaml
# temporal-server-config.yaml
persistence:
  defaultStore: default
  visibilityStore: visibility
  numHistoryShards: 512
  datastores:
    default:
      sql:
        pluginName: postgres
        databaseName: temporal
        connectAddr: postgres-cluster:5432
        connectProtocol: tcp
        maxConns: 20
        maxIdleConns: 20
        maxConnLifetime: 1h
    visibility:
      sql:
        pluginName: postgres
        databaseName: temporal_visibility
        connectAddr: postgres-cluster:5432

global:
  membership:
    maxJoinDuration: 30s
    broadcastAddress: 0.0.0.0
  pprof:
    port: 7936

services:
  frontend:
    rpc:
      grpcPort: 7233
      membershipPort: 6933
      bindOnLocalHost: false
  history:
    rpc:
      grpcPort: 7234
      membershipPort: 6934
      bindOnLocalHost: false
  matching:
    rpc:
      grpcPort: 7235
      membershipPort: 6935
      bindOnLocalHost: false
  worker:
    rpc:
      grpcPort: 7236
      membershipPort: 6936
      bindOnLocalHost: false

clusterMetadata:
  enableGlobalNamespace: true
  failoverVersionIncrement: 10
  masterClusterName: primary
  currentClusterName: primary
  clusterInformation:
    primary:
      enabled: true
      initialFailoverVersion: 0
      rpcName: frontend
      rpcAddress: 0.0.0.0:7233
```

### Temporal Cloud

For production, use Temporal Cloud:

```yaml mcp_agent.config.yaml
execution_engine: temporal

temporal:
  host: your-namespace.tmprl.cloud
  port: 7233
  namespace: your-namespace
  task_queue: mcp-agent-production
  tls:
    client_cert_path: /path/to/client.crt
    client_key_path: /path/to/client.key
    ca_cert_path: /path/to/ca.crt
    server_name: your-namespace.tmprl.cloud
  data_converter:
    encryption_key: ${TEMPORAL_ENCRYPTION_KEY}
    codec: aes256gcm
  retry_policy:
    initial_interval: 1
    maximum_interval: 100
    backoff_coefficient: 2
    maximum_attempts: 50
  auth:
    api_key: ${TEMPORAL_API_KEY}
    namespace: your-namespace
```

### Security Best Practices

**Data Encryption:**
```python
from temporalio.client import Client
from temporalio.converter import EncryptionConverter, CompositeConverter
from cryptography.fernet import Fernet

# Generate encryption key (store securely)
encryption_key = Fernet.generate_key()

# Create encrypted client
client = await Client.connect(
    "your-namespace.tmprl.cloud:7233",
    namespace="your-namespace",
    data_converter=CompositeConverter(
        EncryptionConverter(
            encryption_key,
            compress=True  # Enable compression
        )
    ),
    tls=True
)
```

**Access Control:**
```yaml
# RBAC configuration for Temporal namespaces
namespaces:
  production:
    retention: "30d"
    archival:
      history:
        state: "enabled"
        uri: "s3://temporal-history-archive"
      visibility:
        state: "enabled"
        uri: "s3://temporal-visibility-archive"
    authorization:
      default_role: "worker"
      roles:
        admin:
          permissions:
            - "namespace:*"
            - "workflow:*"
            - "activity:*"
        worker:
          permissions:
            - "workflow:execute"
            - "activity:execute"
        monitor:
          permissions:
            - "workflow:read"
            - "namespace:read"
```

**Network Security:**
```yaml
# Network policies for Kubernetes
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: temporal-network-policy
spec:
  podSelector:
    matchLabels:
      app: temporal
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - podSelector:
        matchLabels:
          app: mcp-agent-worker
    ports:
    - protocol: TCP
      port: 7233
  egress:
  - to:
    - podSelector:
        matchLabels:
          app: postgres
    ports:
    - protocol: TCP
      port: 5432
```

### Worker Scaling

Scale workers for production workloads:

```python
# worker.py for production
import asyncio
from concurrent.futures import ThreadPoolExecutor
from mcp_agent.executor.temporal import create_temporal_worker_for_app

async def main():
    # Create worker with production settings
    worker = await create_temporal_worker_for_app(
        app,
        task_queue="mcp-agent-production",
        max_concurrent_activities=50,
        max_concurrent_workflows=20,
        max_cached_workflows=100,
        activity_executor=ThreadPoolExecutor(max_workers=100),
    )
    
    # Run worker
    await worker.run()

if __name__ == "__main__":
    # Run multiple worker instances for scaling
    asyncio.run(main())
```

### Monitoring and Observability

Monitor workflows with Temporal UI and custom metrics:

```python
# Add custom metrics
from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig

# Configure Prometheus metrics
runtime = Runtime(
    telemetry=TelemetryConfig(
        metrics=PrometheusConfig(bind_address="0.0.0.0:9090")
    )
)

# Track custom metrics in workflows
@app.workflow
class MetricWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, input: str) -> WorkflowResult[str]:
        start_time = time.time()
        
        # Your workflow logic
        result = await self.process(input)
        
        # Record metrics
        duration = time.time() - start_time
        app.context.metrics.record("workflow_duration", duration, {
            "workflow": "MetricWorkflow",
            "status": "success"
        })
        
        return WorkflowResult(value=result)
```

## Debugging

### Temporal Web UI

Access the Temporal Web UI at `http://localhost:8233` to:
- View all workflow executions
- Inspect workflow history step-by-step
- See pending activities and their retry attempts
- Send signals and queries to running workflows
- Download workflow history for offline debugging
- Monitor worker health and task queues

### Workflow Replay

Debug production issues by replaying workflow history:

```python
from temporalio.worker import Replayer
import json

async def debug_workflow():
    # Download history from Temporal UI or API
    with open("workflow_history.json") as f:
        history = json.load(f)
    
    # Create replayer with your workflow definitions
    replayer = Replayer(workflows=[MyWorkflow])
    
    # Replay workflow to debug
    try:
        await replayer.replay_workflow(history)
        print("Replay successful - workflow logic is correct")
    except Exception as e:
        print(f"Replay failed - logic error: {e}")
```

### Testing with Time Skipping

Test long-running workflows efficiently:

```python
import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

@pytest.mark.asyncio
async def test_long_running_workflow():
    # Start test environment with time skipping
    async with await WorkflowEnvironment.start_time_skipping() as env:
        # Create worker
        worker = Worker(
            env.client,
            task_queue="test-queue",
            workflows=[MonitoringWorkflow],
        )
        
        async with worker:
            # Start workflow
            handle = await env.client.start_workflow(
                MonitoringWorkflow.run,
                {"systems": ["api", "database"]},
                id="test-monitoring",
                task_queue="test-queue",
            )
            
            # Time automatically advances during sleep
            # 30 days completes instantly in tests
            result = await handle.result()
            
            assert len(result["monitoring_complete"]) == 720  # 30 days * 24 hours
```

## Migration Guide

### From Asyncio to Temporal

Your workflow code remains largely the same. Here's what changes:

<CodeGroup>
```yaml Before - mcp_agent.config.yaml
execution_engine: asyncio
logger:
  transports: [console]
  level: info
```

```yaml After - mcp_agent.config.yaml
execution_engine: temporal
temporal:
  host: localhost
  port: 7233
  namespace: default
  task_queue: mcp-agent
logger:
  transports: [console]
  level: info
```
</CodeGroup>

### Running Workflows

<CodeGroup>
```python Before - Asyncio
async with app.run():
    workflow = MyWorkflow()
    result = await workflow.run("input")
    print(result.value)
```

```python After - Temporal
# Start worker (separate process)
async with create_temporal_worker_for_app(app) as worker:
    await worker.run()

# Run workflow
async with app.run():
    executor = app.context.executor
    handle = await executor.start_workflow("MyWorkflow", "input")
    result = await handle.result()
    print(result)
```
</CodeGroup>

## Best Practices

<AccordionGroup>
  <Accordion title="Use Deterministic Code">
    Workflows must be deterministic. Avoid:
    - Random number generation without seeds
    - Current time checks (use `workflow.now()`)
    - Direct I/O operations (use activities)
    - Non-deterministic data structures
  </Accordion>
  
  <Accordion title="Set Appropriate Timeouts">
    Configure timeouts for workflows and activities:
    ```python
    handle = await executor.start_workflow(
        "MyWorkflow",
        input_data,
        execution_timeout=timedelta(hours=1),
        run_timeout=timedelta(minutes=30),
        task_timeout=timedelta(minutes=5),
    )
    ```
  </Accordion>
  
  <Accordion title="Use Workflow IDs">
    Set meaningful workflow IDs for idempotency:
    ```python
    workflow_id = f"process-document-{document_id}"
    handle = await executor.start_workflow(
        "DocumentWorkflow",
        document,
        workflow_id=workflow_id,
        id_reuse_policy="allow_duplicate_failed_only",
    )
    ```
  </Accordion>
  
  <Accordion title="Handle Versioning">
    Version your workflows for safe updates:
    ```python
    @app.workflow
    class VersionedWorkflow(Workflow[str]):
        @app.workflow_run
        async def run(self, input: str) -> WorkflowResult[str]:
            version = workflow.get_version("processing_logic", 1, 2)
            
            if version == 1:
                # Old logic
                result = await self.process_v1(input)
            else:
                # New logic
                result = await self.process_v2(input)
            
            return WorkflowResult(value=result)
    ```
  </Accordion>
</AccordionGroup>

## Common Patterns

### Polling External Systems

```python
@app.workflow
class PollingWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, job_id: str) -> WorkflowResult[dict]:
        max_attempts = 100
        
        for attempt in range(max_attempts):
            # Check job status
            agent = Agent(name="checker", server_names=["api"])
            async with agent:
                llm = await agent.attach_llm(OpenAIAugmentedLLM)
                status = await llm.generate_str(f"Check job status: {job_id}")
            
            if "completed" in status:
                return WorkflowResult(value={"status": "completed", "result": status})
            
            if "failed" in status:
                return WorkflowResult(value=None, error=f"Job failed: {status}")
            
            # Wait before next poll (durable)
            await asyncio.sleep(60)  # 1 minute
        
        return WorkflowResult(value=None, error="Job timed out")
```

### Scheduled Workflows

```python
@app.workflow
class ScheduledWorkflow(Workflow[None]):
    @app.workflow_run
    async def run(self, schedule: dict) -> WorkflowResult[None]:
        """Run daily at specified time."""
        while True:
            # Wait until next scheduled time
            next_run = self.calculate_next_run(schedule)
            await workflow.sleep_until(next_run)
            
            # Execute scheduled task
            await self.execute_scheduled_task()
            
            # Continue as new to prevent history growth
            workflow.continue_as_new(schedule)
```

## Examples

Explore complete Temporal examples:

<CardGroup cols={2}>
  <Card 
    title="Basic Temporal Example" 
    icon="play"
    href="https://github.com/lastmile-ai/mcp-agent/tree/main/examples/temporal"
  >
    Simple workflows with Temporal
  </Card>
  <Card 
    title="MCP Agent Server (Temporal)" 
    icon="server"
    href="https://github.com/lastmile-ai/mcp-agent/tree/main/examples/mcp_agent_server/temporal"
  >
    Durable agent server implementation
  </Card>
  <Card 
    title="Parallel Workflow" 
    icon="arrows-split-up-and-left"
    href="https://github.com/lastmile-ai/mcp-agent/tree/main/examples/temporal/parallel.py"
  >
    Fan-out/fan-in pattern
  </Card>
  <Card 
    title="Orchestrator Workflow" 
    icon="users"
    href="https://github.com/lastmile-ai/mcp-agent/tree/main/examples/temporal/orchestrator.py"
  >
    Complex orchestration with Temporal
  </Card>
</CardGroup>

## Next Steps

<CardGroup cols={2}>
  <Card title="Deploy to Cloud" icon="cloud" href="/cloud/deployment-quickstart">
    Deploy Temporal workflows to MCP Agent Cloud
  </Card>
  <Card title="Workflow Patterns" icon="diagram-project" href="/workflows/overview">
    Explore workflow patterns with Temporal
  </Card>
  <Card title="Temporal Documentation" icon="book" href="https://docs.temporal.io">
    Deep dive into Temporal concepts
  </Card>
  <Card title="Production Guide" icon="rocket" href="https://docs.temporal.io/production-deployment">
    Temporal production deployment guide
  </Card>
</CardGroup>
